package org.apache.solr.handler.dataimport;

import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
import static org.apache.solr.handler.dataimport.DocBuilder.loadClass;
import static org.apache.solr.handler.dataimport.config.ConfigNameConstants.CLASS;

import java.io.IOException;
import java.io.StringReader;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;

import org.apache.commons.io.IOUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.XMLErrorLogger;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.ConfigParseUtil;
import org.apache.solr.handler.dataimport.config.DIHConfiguration;
import org.apache.solr.handler.dataimport.config.Entity;
import org.apache.solr.handler.dataimport.config.PropertyWriter;
import org.apache.solr.handler.dataimport.config.Script;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.util.SystemIdResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;

/**
 * {@link DataImporter}包装，只为在在处理完成时，可以得到处理处理的数据（在{@link RecordableDocBuilder}
 * 中实现）。所有功能均相同，只是在{@link #doDeltaImport}方法中初始化 {@link #docBuilder}变量时，改为使用
 * {@link RecordableDocBuilder}类.
 * 
 * @author Gex
 *
 */
public class RecordableDataImport extends DataImporter {

	private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
	private static final XMLErrorLogger XMLLOG = new XMLErrorLogger(LOG);

	private Status status = Status.IDLE;
	private DIHConfiguration config;
	private Date indexStartTime;
	private Properties store = new Properties();
	private Map<String, Map<String, String>> requestLevelDataSourceProps = new HashMap<>();
	private IndexSchema schema;
	public DocBuilder docBuilder;
	public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
	private SolrCore core;
	private Map<String, Object> coreScopeSession = new ConcurrentHashMap<>();
	private ReentrantLock importLock = new ReentrantLock();
	private boolean isDeltaImportSupported = false;
	private final String handlerName;

	/**
	 * Only for testing purposes
	 */
	RecordableDataImport() {
		this.handlerName = "dataimport";
	}

	RecordableDataImport(SolrCore core, String handlerName) {
		this.handlerName = handlerName;
		this.core = core;
		this.schema = core.getLatestSchema();
	}

	@SuppressWarnings("rawtypes")
	boolean maybeReloadConfiguration(RequestInfo params, NamedList<?> defaultParams) throws IOException {
		if (importLock.tryLock()) {
			boolean success = false;
			try {
				if (null != params.getRequest()) {
					if (schema != params.getRequest().getSchema()) {
						schema = params.getRequest().getSchema();
					}
				}
				String dataConfigText = params.getDataConfig();
				String dataconfigFile = params.getConfigFile();
				InputSource is = null;
				if (dataConfigText != null && dataConfigText.length() > 0) {
					is = new InputSource(new StringReader(dataConfigText));
				} else if (dataconfigFile != null) {
					is = new InputSource(core.getResourceLoader().openResource(dataconfigFile));
					is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(dataconfigFile));
					LOG.info("Loading DIH Configuration: " + dataconfigFile);
				}
				if (is != null) {
					config = loadDataConfig(is);
					success = true;
				}

				Map<String, Map<String, String>> dsProps = new HashMap<>();
				if (defaultParams != null) {
					int position = 0;
					while (position < defaultParams.size()) {
						if (defaultParams.getName(position) == null) {
							break;
						}
						String name = defaultParams.getName(position);
						if (name.equals("datasource")) {
							success = true;
							NamedList dsConfig = (NamedList) defaultParams.getVal(position);
							LOG.info("Getting configuration for Global Datasource...");
							Map<String, String> props = new HashMap<>();
							for (int i = 0; i < dsConfig.size(); i++) {
								props.put(dsConfig.getName(i), dsConfig.getVal(i).toString());
							}
							LOG.info("Adding properties to datasource: " + props);
							dsProps.put((String) dsConfig.get("name"), props);
						}
						position++;
					}
				}
				requestLevelDataSourceProps = Collections.unmodifiableMap(dsProps);
			} catch (IOException ioe) {
				throw ioe;
			} finally {
				importLock.unlock();
			}
			return success;
		} else {
			return false;
		}
	}

	public String getHandlerName() {
		return handlerName;
	}

	public IndexSchema getSchema() {
		return schema;
	}

	/**
	 * Used by tests
	 */
	public void loadAndInit(String configStr) {
		config = loadDataConfig(new InputSource(new StringReader(configStr)));
	}

	public void loadAndInit(InputSource configFile) {
		config = loadDataConfig(configFile);
	}

	public DIHConfiguration loadDataConfig(InputSource configFile) {

		DIHConfiguration dihcfg = null;
		try {
			DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();

			// only enable xinclude, if a a SolrCore and SystemId is present
			// (makes no sense otherwise)
			if (core != null && configFile.getSystemId() != null) {
				try {
					dbf.setXIncludeAware(true);
					dbf.setNamespaceAware(true);
				} catch (UnsupportedOperationException e) {
					LOG.warn("XML parser doesn't support XInclude option");
				}
			}

			DocumentBuilder builder = dbf.newDocumentBuilder();
			if (core != null)
				builder.setEntityResolver(new SystemIdResolver(core.getResourceLoader()));
			builder.setErrorHandler(XMLLOG);
			Document document;
			try {
				document = builder.parse(configFile);
			} finally {
				// some XML parsers are broken and don't close the byte stream
				// (but they should according to spec)
				IOUtils.closeQuietly(configFile.getByteStream());
			}

			dihcfg = readFromXml(document);
			LOG.info("Data Configuration loaded successfully");
		} catch (Exception e) {
			throw new DataImportHandlerException(SEVERE, "Data Config problem: " + e.getMessage(), e);
		}
		for (Entity e : dihcfg.getEntities()) {
			if (e.getAllAttributes().containsKey(SqlEntityProcessor.DELTA_QUERY)) {
				isDeltaImportSupported = true;
				break;
			}
		}
		return dihcfg;
	}

	public DIHConfiguration readFromXml(Document xmlDocument) {
		@SuppressWarnings("unused")
		DIHConfiguration config;
		List<Map<String, String>> functions = new ArrayList<>();
		Script script = null;
		Map<String, Map<String, String>> dataSources = new HashMap<>();

		NodeList dataConfigTags = xmlDocument.getElementsByTagName("dataConfig");
		if (dataConfigTags == null || dataConfigTags.getLength() == 0) {
			throw new DataImportHandlerException(SEVERE, "the root node '<dataConfig>' is missing");
		}
		Element e = (Element) dataConfigTags.item(0);
		List<Element> documentTags = ConfigParseUtil.getChildNodes(e, "document");
		if (documentTags.isEmpty()) {
			throw new DataImportHandlerException(SEVERE,
					"DataImportHandler " + "configuration file must have one <document> node.");
		}

		List<Element> scriptTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.SCRIPT);
		if (!scriptTags.isEmpty()) {
			script = new Script(scriptTags.get(0));
		}

		// Add the provided evaluators
		List<Element> functionTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.FUNCTION);
		if (!functionTags.isEmpty()) {
			for (Element element : functionTags) {
				String func = ConfigParseUtil.getStringAttribute(element, NAME, null);
				String clz = ConfigParseUtil.getStringAttribute(element, ConfigNameConstants.CLASS, null);
				if (func == null || clz == null) {
					throw new DataImportHandlerException(SEVERE,
							"<function> must have a 'name' and 'class' attributes");
				} else {
					functions.add(ConfigParseUtil.getAllAttributes(element));
				}
			}
		}
		List<Element> dataSourceTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.DATA_SRC);
		if (!dataSourceTags.isEmpty()) {
			for (Element element : dataSourceTags) {
				Map<String, String> p = new HashMap<>();
				HashMap<String, String> attrs = ConfigParseUtil.getAllAttributes(element);
				for (Map.Entry<String, String> entry : attrs.entrySet()) {
					p.put(entry.getKey(), entry.getValue());
				}
				dataSources.put(p.get("name"), p);
			}
		}
		if (dataSources.get(null) == null) {
			for (Map<String, String> properties : dataSources.values()) {
				dataSources.put(null, properties);
				break;
			}
		}
		PropertyWriter pw = null;
		List<Element> propertyWriterTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.PROPERTY_WRITER);
		if (propertyWriterTags.isEmpty()) {
			boolean zookeeper = false;
			if (this.core != null && this.core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
				zookeeper = true;
			}
			pw = new PropertyWriter(zookeeper ? "ZKPropertiesWriter" : "SimplePropertiesWriter",
					Collections.<String, String> emptyMap());
		} else if (propertyWriterTags.size() > 1) {
			throw new DataImportHandlerException(SEVERE,
					"Only one " + ConfigNameConstants.PROPERTY_WRITER + " can be configured.");
		} else {
			Element pwElement = propertyWriterTags.get(0);
			String type = null;
			Map<String, String> params = new HashMap<>();
			for (Map.Entry<String, String> entry : ConfigParseUtil.getAllAttributes(pwElement).entrySet()) {
				if (TYPE.equals(entry.getKey())) {
					type = entry.getValue();
				} else {
					params.put(entry.getKey(), entry.getValue());
				}
			}
			if (type == null) {
				throw new DataImportHandlerException(SEVERE,
						"The " + ConfigNameConstants.PROPERTY_WRITER + " element must specify " + TYPE);
			}
			pw = new PropertyWriter(type, params);
		}
		return new DIHConfiguration(documentTags.get(0), this, functions, script, dataSources, pw);
	}

	@SuppressWarnings("unchecked")
	private DIHProperties createPropertyWriter() {
		DIHProperties propWriter = null;
		PropertyWriter configPw = config.getPropertyWriter();
		try {
			Class<DIHProperties> writerClass = DocBuilder.loadClass(configPw.getType(), this.core);
			propWriter = writerClass.newInstance();
			propWriter.init(this, configPw.getParameters());
		} catch (Exception e) {
			throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
					"Unable to PropertyWriter implementation:" + configPw.getType(), e);
		}
		return propWriter;
	}

	public DIHConfiguration getConfig() {
		return config;
	}

	Date getIndexStartTime() {
		return indexStartTime;
	}

	void setIndexStartTime(Date indextStartTime) {
		this.indexStartTime = indextStartTime;
	}

	void store(Object key, Object value) {
		store.put(key, value);
	}

	Object retrieve(Object key) {
		return store.get(key);
	}

	@SuppressWarnings("rawtypes")
	public DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
		Map<String, String> p = requestLevelDataSourceProps.get(name);
		if (p == null)
			p = config.getDataSources().get(name);
		if (p == null)
			p = requestLevelDataSourceProps.get(null);// for default data source
		if (p == null)
			p = config.getDataSources().get(null);
		if (p == null)
			throw new DataImportHandlerException(SEVERE,
					"No dataSource :" + name + " available for entity :" + key.getName());
		String type = p.get(TYPE);
		DataSource dataSrc = null;
		if (type == null) {
			dataSrc = new JdbcDataSource();
		} else {
			try {
				dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();
			} catch (Exception e) {
				wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);
			}
		}
		try {
			Properties copyProps = new Properties();
			copyProps.putAll(p);
			Map<String, Object> map = ctx.getRequestParameters();
			if (map.containsKey("rows")) {
				int rows = Integer.parseInt((String) map.get("rows"));
				if (map.containsKey("start")) {
					rows += Integer.parseInt((String) map.get("start"));
				}
				copyProps.setProperty("maxRows", String.valueOf(rows));
			}
			dataSrc.init(ctx, copyProps);
		} catch (Exception e) {
			wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.getDataSourceName());
		}
		return dataSrc;
	}

	public Status getStatus() {
		return status;
	}

	public void setStatus(Status status) {
		this.status = status;
	}

	public boolean isBusy() {
		return importLock.isLocked();
	}

	public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
		LOG.info("Starting Full Import");
		setStatus(Status.RUNNING_FULL_DUMP);
		try {
			DIHProperties dihPropWriter = createPropertyWriter();
			setIndexStartTime(dihPropWriter.getCurrentTimestamp());
			docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
			checkWritablePersistFile(writer, dihPropWriter);
			docBuilder.execute();
			if (!requestParams.isDebug())
				cumulativeStatistics.add(docBuilder.importStatistics);
		} catch (Exception e) {
			SolrException.log(LOG, "Full Import failed", e);
			docBuilder.handleError("Full Import failed", e);
		} finally {
			setStatus(Status.IDLE);
			DocBuilder.INSTANCE.set(null);
		}

	}

	private void checkWritablePersistFile(DIHWriter writer, DIHProperties dihPropWriter) {
		if (isDeltaImportSupported && !dihPropWriter.isWritable()) {
			throw new DataImportHandlerException(SEVERE,
					"Properties is not writable. Delta imports are supported by data config but will not work.");
		}
	}

	public void doDeltaImport(DIHWriter writer, RequestInfo requestParams) {
		LOG.info("Starting Delta Import");
		setStatus(Status.RUNNING_DELTA_DUMP);
		try {
			DIHProperties dihPropWriter = createPropertyWriter();
			setIndexStartTime(dihPropWriter.getCurrentTimestamp());
			docBuilder = new RecordableDocBuilder(this, writer, dihPropWriter, requestParams);
			checkWritablePersistFile(writer, dihPropWriter);
			docBuilder.execute();
			if (!requestParams.isDebug())
				cumulativeStatistics.add(docBuilder.importStatistics);
		} catch (Exception e) {
			LOG.error("Delta Import Failed", e);
			docBuilder.handleError("Delta Import Failed", e);
		} finally {
			setStatus(Status.IDLE);
			DocBuilder.INSTANCE.set(null);
		}

	}

	public void runAsync(final RequestInfo reqParams, final DIHWriter sw) {
		new Thread() {
			@Override
			public void run() {
				runCmd(reqParams, sw);
			}
		}.start();
	}

	void runCmd(RequestInfo reqParams, DIHWriter sw) {
		String command = reqParams.getCommand();
		if (command.equals(ABORT_CMD)) {
			if (docBuilder != null) {
				docBuilder.abort();
			}
			return;
		}
		if (!importLock.tryLock()) {
			LOG.warn("Import command failed . another import is running");
			return;
		}
		try {
			if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
				doFullImport(sw, reqParams);
			} else if (command.equals(DELTA_IMPORT_CMD)) {
				doDeltaImport(sw, reqParams);
			}
		} finally {
			importLock.unlock();
		}
	}

	@SuppressWarnings("rawtypes")
	Map<String, String> getStatusMessages() {
		// this map object is a Collections.synchronizedMap(new
		// LinkedHashMap()). if we
		// synchronize on the object it must be safe to iterate through the map
		Map statusMessages = (Map) retrieve(STATUS_MSGS);
		Map<String, String> result = new LinkedHashMap<>();
		if (statusMessages != null) {
			synchronized (statusMessages) {
				for (Object o : statusMessages.entrySet()) {
					Map.Entry e = (Map.Entry) o;
					// the toString is taken because some of the Objects create
					// the data lazily when toString() is called
					result.put((String) e.getKey(), e.getValue().toString());
				}
			}
		}
		return result;

	}

	public DocBuilder getDocBuilder() {
		return docBuilder;
	}

	public DocBuilder getDocBuilder(DIHWriter writer, RequestInfo requestParams) {
		DIHProperties dihPropWriter = createPropertyWriter();
		return new DocBuilder(this, writer, dihPropWriter, requestParams);
	}

	Map<String, Evaluator> getEvaluators() {
		return getEvaluators(config.getFunctions());
	}

	/**
	 * used by tests.
	 */
	Map<String, Evaluator> getEvaluators(List<Map<String, String>> fn) {
		Map<String, Evaluator> evaluators = new HashMap<>();
		evaluators.put(Evaluator.DATE_FORMAT_EVALUATOR, new DateFormatEvaluator());
		evaluators.put(Evaluator.SQL_ESCAPE_EVALUATOR, new SqlEscapingEvaluator());
		evaluators.put(Evaluator.URL_ENCODE_EVALUATOR, new UrlEvaluator());
		evaluators.put(Evaluator.ESCAPE_SOLR_QUERY_CHARS, new SolrQueryEscapingEvaluator());
		SolrCore core = docBuilder == null ? null : docBuilder.dataImporter.getCore();
		for (Map<String, String> map : fn) {
			try {
				evaluators.put(map.get(NAME), (Evaluator) loadClass(map.get(CLASS), core).newInstance());
			} catch (Exception e) {
				wrapAndThrow(SEVERE, e, "Unable to instantiate evaluator: " + map.get(CLASS));
			}
		}
		return evaluators;
	}

	static final ThreadLocal<AtomicLong> QUERY_COUNT = new ThreadLocal<AtomicLong>() {
		@Override
		protected AtomicLong initialValue() {
			return new AtomicLong();
		}
	};

	public SolrCore getCore() {
		return core;
	}

	void putToCoreScopeSession(String key, Object val) {
		coreScopeSession.put(key, val);
	}

	Object getFromCoreScopeSession(String key) {
		return coreScopeSession.get(key);
	}

}
